#!/usr/bin/env python

import cStringIO,sys

def partitioning_function(key):
    return hash(key)

def write_chunk(wp, reducer, chunk):
    header = "\nBEGCHUNK %9d %9d\n" % (reducer, len(chunk))
    trailer = "\nENDCHUNK\n"
    assert (len(header) == 30), header
    assert (len(trailer) == 10), trailer
    wp.write(header)
    wp.write(chunk)
    wp.write(trailer)

def flush_buffers(wps, n_reducers, sz, flush_all):
    n = len(wps) - 1
    if n == 0: n = 1
    if flush_all:
        max_unflushed = 0
    else:
        max_unflushed = sz / 8
    min_flushed = sz - max_unflushed
    threshold = (max_unflushed + n - 1) / n
    flushed = 0
    unflushed = 0
    for reducer,wp in wps.items():
        chunk = wp.getvalue()
        l = len(chunk)
        if l >= threshold:
            wp.close()
            del wps[reducer]
            write_chunk(sys.stdout, reducer, chunk)
            flushed = flushed + l
        else:
            unflushed = unflushed + l
    assert flushed + unflushed == sz, (flushed, unflushed, sz)
    assert (flushed >= min_flushed), \
        (flushed, min_flushed, unflushed, n_reducers, sz, threshold, flush_all)
    return unflushed

def partition(n_reducers):
    # default buffer size
    bufsz = (10 * 1024 * 1024) / n_reducers
    sz = 0
    wps = {}
    for line in sys.stdin:
        [ key, val ] = line.split(None, 1)
        reducer = partitioning_function(key) % n_reducers
        if not wps.has_key(reducer): 
            wps[reducer] = cStringIO.StringIO()
        wps[reducer].write(line)
        sz = sz + len(line)
        if sz > bufsz:
            sz = flush_buffers(wps, n_reducers, sz, 0)
    flush_buffers(wps, n_reducers, sz, 1)
    assert (len(wps) == 0), wps

def main():
    n_reducers = int(sys.argv[1])
    sys.exit(partition(n_reducers))

if __name__ == "__main__":
    main()
